EMR 大文件操作及 Mapreduce 按 Value 排序若干

关于 EC2 出现的 No space left on device 问题以及 Mapreduce postprocessing 怎么按 Value 排序

No space 问题

EMR 开的 m3.xlarge 的机器,但是发现不能下载 8G 的压缩包,更别说是 30G 的文件了,显示的是 No space left on device 的错误。来看一下当前磁盘的情况。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
[hadoop@ip-172-31-24-108 ~]$ df -h
Filesystem Size Used Avail Use% Mounted on
devtmpfs 7.4G 28K 7.4G 1% /dev
tmpfs 7.4G 0 7.4G 0% /dev/shm
/dev/xvda1 9.8G 8.2G 1.5G 85% /
/dev/xvdb1 5.0G 34M 5.0G 1% /emr
/dev/xvdb2 33G 334M 33G 2% /mnt
/dev/xvdc 38G 35M 38G 1% /mnt1
[hadoop@ip-172-31-24-108 ~]$ sudo file -s /dev/xvdc
/dev/xvdc: SGI XFS filesystem data (blksz 4096, inosz 256, v2 dirs)
[hadoop@ip-172-31-24-108 ~]$ lsblk
NAME MAJ:MIN RM SIZE RO TYPE MOUNTPOINT
xvdb 202:16 0 37.5G 0 disk
├─xvdb1 202:17 0 5G 0 part /emr
└─xvdb2 202:18 0 32.5G 0 part /mnt
xvdc 202:32 0 37.5G 0 disk /mnt1
xvda1 202:1 0 10G 0 disk /

发现硬盘 /dev/xvdc 的大小为 37.5G,足够容纳我们的文件,并且它有 MOUNTPOINT,建立好了文件系统,说明已经可以投入使用啦。

新建文件夹并 MOUNT

1
2
3
[hadoop@ip-172-31-24-108 ~]$ sudo mkdir /localinput
[hadoop@ip-172-31-24-108 ~]$ sudo mount /dev/xvdc /localinput
[hadoop@ip-172-31-24-108 ~]$ cd /localinput/

COPY 文件

1
2
[hadoop@ip-172-31-24-108 localinput]$ aws s3 cp s3://95869/com-friendster.ungraph.txt .
download: s3://95869le/com-friendster.ungraph.txt to ./com-friendster.ungraph.txt

Mapreduce 按 Value 排序

Input 是 (key, frequency),主要思路是 Mapper 里将 key,value 反转变成 (frequency, key),然后定义 sort 的方法为 numerical descending order,reducer 再按正常顺序 (key, frequency) 输出。

Hadoop batch + Java

用 JAVA 做其实是件很简单的事,在 main 函数里设置 Configuration,然后写个 Comparator
Comparator

1
2
3
4
5
6
7
8
9
10
11
12
13
public static class IntComparator extends WritableComparator {
public IntComparator() {
super(IntWritable.class);
}
@Override
public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
Integer v1 = ByteBuffer.wrap(b1, s1, l1).getInt();
Integer v2 = ByteBuffer.wrap(b2, s2, l2).getInt();
return v2.compareTo(v1);
}
}

Configuration

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "word count");
job.setJarByClass(WordCount.class);
job.setMapperClass(TokenizerMapper.class);
job.setNumReduceTasks(1);//number of reducers for the job
job.setReducerClass(MyReducer.class);
job.setOutputKeyClass(IntWritable.class);//change this
job.setOutputValueClass(Text.class);//change this
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}

Hadoop streaming + python

用 EMR Streaming + python 也很简单。在 Add Steps 时加上 argument 配置

1
2
3
-D mapreduce.job.reduces=1
-D mapred.output.key.comparator.class=org.apache.hadoop.mapred.lib.KeyFieldBasedComparator
-D mapred.text.key.comparator.options=-nr

Spark

Spark 就更是超级无敌简单了,假设 calRdd 计算了 (key, frequency),那么就只要做如下操作

1
2
3
4
# Sorts the nodes in decreasing order of their degrees
sortRdd=calRdd.map(lambda x:(x[1],x[0])).sortByKey(ascending=False).map(lambda x:(x[1],x[0]))
# Prints the top-100 nodes which have the highest degrees
sortRdd.take(100)

徐阿衡 wechat
欢迎关注:徐阿衡的微信公众号
客官,打个赏呗~